module_path = ['../../dataset_scripts', '../../../caffe/python']
import os
import sys
for path in module_path:
p = os.path.abspath(os.path.join(path))
if p not in sys.path:
sys.path.append(p)
import cv2 as cv
import numpy as np
import scipy
import PIL.Image
import math
import caffe
import time
from config_reader import config_reader
import util
import copy
import matplotlib
%matplotlib inline
import pylab as plt
from scipy.spatial import distance
from scipy.ndimage.filters import gaussian_filter
from scipy.io import loadmat
from numpy import ma
import pickle
import cuhk_large
from sklearn.metrics import average_precision_score, precision_recall_curve
dataset_root = '../../dataset/cuhk_large/dataset'
dataset = cuhk_large.CUHK_Large(dataset_root)
test_pool_size = dataset.get_test_pool_size()
print('Total test pool size: %d' % (test_pool_size))
query_size = dataset.get_test_query_size()
gallery_size = dataset.get_test_query_gallery_size()
print('Total query size: %d with gallery size: %d' % (query_size, gallery_size))
| id | key point (heatmap) | limb (paf) |
|---|---|---|
| 0 | nose | neck -> r-hip (x) |
| 1 | neck | r-hip -> neck (y) |
| 2 | r-shoulder | r-hip -> r-knee (x) |
| 3 | r-elbow | r-knee -> r-hip (y) |
| 4 | r-wrist | r-knee -> r-ankle (x) |
| 5 | l-shoulder | r-ankle -> r-knee (y) |
| 6 | l-elbow | neck -> l-hip (x) |
| 7 | l-wrist | l-hip -> neck (y) |
| 8 | r-hip | l-hip -> l-knee (x) |
| 9 | r-knee | l-knee -> l-hip (y) |
| 10 | r-ankle | l-knee -> l-ankle (x) |
| 11 | l-hip | l-ankle -> l-knee (y) |
| 12 | l-knee | neck -> r-shoulder (x) |
| 13 | l-ankle | r-shoulder -> neck (y) |
| 14 | r-eye | r-shoulder -> r-elbow (x) |
| 15 | l-eye | r-elbow -> r-shoulder (y) |
| 16 | r-ear | r-elbow -> r-wrist (x) |
| 17 | l-ear | r-wrist -> r-elbow (y) |
| 18 | r-shoulder -> r-ear (x) | |
| 19 | r-ear -> r-shoulder (y) | |
| 20 | neck -> l-shoulder (x) | |
| 21 | l-shoulder -> neck (y) | |
| 22 | l-shoulder -> l-elbow (x) | |
| 23 | l-elbow -> l-shoulder (y) | |
| 24 | l-elbow -> l-wrist (x) | |
| 25 | l-wrist -> l-elbow (y) | |
| 26 | l-shoulder -> l-ear (x) | |
| 27 | l-ear -> l-shoulder (y) | |
| 28 | neck -> nose (x) | |
| 29 | nose -> neck (y) | |
| 30 | nose -> r-eye (x) | |
| 31 | r-eye -> nose (y) | |
| 32 | nose -> l-eye (x) | |
| 33 | l-eye -> nose (y) | |
| 34 | r-eye -> r-ear (x) | |
| 35 | r-ear -> r-eye (y) | |
| 36 | l-eye -> l-ear (x) | |
| 37 | l-ear -> l-eye (y) |
param, model = config_reader()
print(param)
print(model)
net = caffe.Net(model['deployFile'], model['caffemodel'], caffe.TEST)
key_point_count = 19
limb_count = 38
mid_num = 10
# find connection in the specified sequence, center 29 is in the position 15
limbSeq = [[2,3], [2,6], [3,4], [4,5], [6,7], [7,8], [2,9], [9,10], \
[10,11], [2,12], [12,13], [13,14], [2,1], [1,15], [15,17], \
[1,16], [16,18], [3,17], [6,18]]
# the middle joints heatmap correpondence
mapIdx = [[31,32], [39,40], [33,34], [35,36], [41,42], [43,44], [19,20], [21,22], \
[23,24], [25,26], [27,28], [29,30], [47,48], [49,50], [53,54], [51,52], \
[55,56], [37,38], [45,46]]
# [12,13], [20,21], [14,15], [16,17], [22,23], [24,25], [0,1], [2,3],
# [4,5], [6,7], [8,9], [10,11], [28,29], [30,31], [34,35], [32,33],
# [36,37], [18,19], [26,27]
def find_heatmap_and_paf(oriImg):
if param['use_gpu']:
caffe.set_mode_gpu()
caffe.set_device(param['GPUdeviceNumber']) # set to your device!
else:
caffe.set_mode_cpu()
multiplier = [x * model['boxsize'] / np.max(oriImg.shape) for x in param['scale_search']]
heatmap_avg = np.zeros((oriImg.shape[0], oriImg.shape[1], key_point_count))
paf_avg = np.zeros((oriImg.shape[0], oriImg.shape[1], limb_count))
for m in range(len(multiplier)):
scale = multiplier[m]
imageToTest = cv.resize(oriImg, (0,0), fx=scale, fy=scale, interpolation=cv.INTER_CUBIC)
imageToTest_padded, pad = util.padRightDownCorner(imageToTest, model['stride'], model['padValue'])
# print(imageToTest_padded.shape)
net.blobs['data'].reshape(*(1, 3, imageToTest_padded.shape[0], imageToTest_padded.shape[1]))
#net.forward() # dry run
net.blobs['data'].data[...] = np.transpose(np.float32(imageToTest_padded[:,:,:,np.newaxis]), (3,2,0,1))/256 - 0.5;
start_time = time.time()
output_blobs = net.forward()
# print('At scale %d, The CNN took %.2f ms.' % (m, 1000 * (time.time() - start_time)))
# extract outputs, resize, and remove padding
heatmap = np.transpose(np.squeeze(net.blobs[list(output_blobs.keys())[1]].data), (1,2,0)) # output 1 is heatmaps
heatmap = cv.resize(heatmap, (0,0), fx=model['stride'], fy=model['stride'], interpolation=cv.INTER_CUBIC)
heatmap = heatmap[:imageToTest_padded.shape[0]-pad[2], :imageToTest_padded.shape[1]-pad[3], :]
heatmap = cv.resize(heatmap, (oriImg.shape[1], oriImg.shape[0]), interpolation=cv.INTER_CUBIC)
paf = np.transpose(np.squeeze(net.blobs[list(output_blobs.keys())[0]].data), (1,2,0)) # output 0 is PAFs
paf = cv.resize(paf, (0,0), fx=model['stride'], fy=model['stride'], interpolation=cv.INTER_CUBIC)
paf = paf[:imageToTest_padded.shape[0]-pad[2], :imageToTest_padded.shape[1]-pad[3], :]
paf = cv.resize(paf, (oriImg.shape[1], oriImg.shape[0]), interpolation=cv.INTER_CUBIC)
heatmap_avg = heatmap_avg + heatmap / len(multiplier)
paf_avg = paf_avg + paf / len(multiplier)
return (heatmap_avg, paf_avg)
def find_all_peaks(heatmap_avg):
all_peaks = []
peak_counter = 0
for part in range(key_point_count-1):
x_list = []
y_list = []
map_ori = heatmap_avg[:,:,part]
map = gaussian_filter(map_ori, sigma=3)
map_left = np.zeros(map.shape)
map_left[1:,:] = map[:-1,:]
map_right = np.zeros(map.shape)
map_right[:-1,:] = map[1:,:]
map_up = np.zeros(map.shape)
map_up[:,1:] = map[:,:-1]
map_down = np.zeros(map.shape)
map_down[:,:-1] = map[:,1:]
peaks_binary = np.logical_and.reduce((map>=map_left, map>=map_right, map>=map_up, map>=map_down, map > param['thre1']))
peaks = list(zip(np.nonzero(peaks_binary)[1], np.nonzero(peaks_binary)[0])) # note reverse
peaks_with_score = [x + (map_ori[x[1],x[0]],) for x in peaks]
id = range(peak_counter, peak_counter + len(peaks))
peaks_with_score_and_id = [peaks_with_score[i] + (id[i],) for i in range(len(id))]
all_peaks.append(peaks_with_score_and_id)
peak_counter += len(peaks)
return (all_peaks, peak_counter)
def find_subset_and_candidate(oriImg, paf_avg, all_peaks):
connection_all = []
special_k = []
for k in range(len(mapIdx)):
score_mid = paf_avg[:,:,[x-key_point_count for x in mapIdx[k]]]
candA = all_peaks[limbSeq[k][0]-1]
candB = all_peaks[limbSeq[k][1]-1]
nA = len(candA)
nB = len(candB)
indexA, indexB = limbSeq[k]
if(nA != 0 and nB != 0):
connection_candidate = []
for i in range(nA):
for j in range(nB):
vec = np.subtract(candB[j][:2], candA[i][:2])
norm = math.sqrt(vec[0]*vec[0] + vec[1]*vec[1])
if norm <= 0:
continue
vec = np.divide(vec, norm)
startend = zip(np.linspace(candA[i][0], candB[j][0], num=mid_num), \
np.linspace(candA[i][1], candB[j][1], num=mid_num))
vec_x = np.array([score_mid[int(round(startend[I][1])), int(round(startend[I][0])), 0] \
for I in range(len(startend))])
vec_y = np.array([score_mid[int(round(startend[I][1])), int(round(startend[I][0])), 1] \
for I in range(len(startend))])
score_midpts = np.multiply(vec_x, vec[0]) + np.multiply(vec_y, vec[1])
score_with_dist_prior = sum(score_midpts)/len(score_midpts) + min(0.5*oriImg.shape[0]/norm-1, 0)
criterion1 = len(np.nonzero(score_midpts > param['thre2'])[0]) > 0.8 * len(score_midpts)
criterion2 = score_with_dist_prior > 0
if criterion1 and criterion2:
connection_candidate.append([i, j, score_with_dist_prior, score_with_dist_prior+candA[i][2]+candB[j][2]])
connection_candidate = sorted(connection_candidate, key=lambda x: x[2], reverse=True)
connection = np.zeros((0,5))
for c in range(len(connection_candidate)):
i,j,s = connection_candidate[c][0:3]
if(i not in connection[:,3] and j not in connection[:,4]):
connection = np.vstack([connection, [candA[i][3], candB[j][3], s, i, j]])
if(len(connection) >= min(nA, nB)):
break
connection_all.append(connection)
else:
special_k.append(k)
connection_all.append([])
# last number in each row is the total parts number of that person
# the second last number in each row is the score of the overall configuration
subset = -1 * np.ones((0, 20))
candidate = np.array([item for sublist in all_peaks for item in sublist])
for k in range(len(mapIdx)):
if k not in special_k:
partAs = connection_all[k][:,0]
partBs = connection_all[k][:,1]
indexA, indexB = np.array(limbSeq[k]) - 1
for i in range(len(connection_all[k])): #= 1:size(temp,1)
found = 0
subset_idx = [-1, -1]
for j in range(len(subset)): #1:size(subset,1):
if subset[j][indexA] == partAs[i] or subset[j][indexB] == partBs[i]:
subset_idx[found] = j
found += 1
if found == 1:
j = subset_idx[0]
if(subset[j][indexB] != partBs[i]):
subset[j][indexB] = partBs[i]
subset[j][-1] += 1
subset[j][-2] += candidate[partBs[i].astype(int), 2] + connection_all[k][i][2]
elif found == 2: # if found 2 and disjoint, merge them
j1, j2 = subset_idx
# print "found = 2"
membership = ((subset[j1]>=0).astype(int) + (subset[j2]>=0).astype(int))[:-2]
if len(np.nonzero(membership == 2)[0]) == 0: #merge
subset[j1][:-2] += (subset[j2][:-2] + 1)
subset[j1][-2:] += subset[j2][-2:]
subset[j1][-2] += connection_all[k][i][2]
subset = np.delete(subset, j2, 0)
else: # as like found == 1
subset[j1][indexB] = partBs[i]
subset[j1][-1] += 1
subset[j1][-2] += candidate[partBs[i].astype(int), 2] + connection_all[k][i][2]
# if find no partA in the subset, create a new subset
elif not found and k < 17:
row = -1 * np.ones(20)
row[indexA] = partAs[i]
row[indexB] = partBs[i]
row[-1] = 2
row[-2] = sum(candidate[connection_all[k][i,:2].astype(int), 2]) + connection_all[k][i][2]
subset = np.vstack([subset, row])
# delete some rows of subset which has few parts occur
deleteIdx = [];
for k in range(len(subset)):
if subset[k][-1] < 4 or subset[k][-2]/subset[k][-1] < 0.4:
deleteIdx.append(k)
subset = np.delete(subset, deleteIdx, axis=0)
return (subset, candidate)
body_types = ['face', 'up', 'low', 'body']
# 'face', 'up', 'low'
body_index = [[0, 14, 15, 16, 17], [2, 3, 4, 5, 6, 7, 8, 11], [8, 9, 10, 11, 12, 13]]
body_indices = dict(zip(body_types, body_index))
def box_extend(box, ratio, limit):
l, r, t, b = box
dx = r - l
dy = b - t
l = int(max(l-ratio*dx, 0))
r = int(min(r+ratio*dx, limit[0]))
t = int(max(t-ratio*dy, 0))
b = int(min(b+ratio*dy, limit[1]))
return [l, r, t, b]
def find_person_bounding_box(image_name):
oriImg = cv.imread(dataset.get_image_path(image_name))
heatmap_avg, paf_avg = find_heatmap_and_paf(oriImg)
all_peaks, _ = find_all_peaks(heatmap_avg)
subset, candidate = find_subset_and_candidate(oriImg, paf_avg, all_peaks)
persons = []
for n in range(len(subset)):
bboxes = dict(zip(body_types, [[]] * len(body_types)))
bboxes['body'] = [oriImg.shape[0], 0, oriImg.shape[1], 0]
for part in body_indices:
l = r = t = b = 0
index = subset[n][np.array(body_indices[part])]
index = np.delete(index, np.argwhere(index==-1))
if len(index) != 0:
Y = candidate[index.astype(int), 0]
X = candidate[index.astype(int), 1]
l = int(np.min(X))
r = int(np.max(X))
t = int(np.min(Y))
b = int(np.max(Y))
if part == 'face':
cx = int(np.mean(X))
cy = int(np.mean(Y))
d = np.mean([r-l, b-t])
l = int(cx - d)
r = int(cx + d)
t = int(cy - d)
b = int(cy + d)
if part != 'face':
l, r, t, b = box_extend([l, r, t, b], 0.2, (oriImg.shape[0], oriImg.shape[1]))
bboxes[part] = [l, r, t, b]
if l > 0 or r > 0 or t > 0 or b > 0:
bboxes['body'][0] = min(bboxes['body'][0], l)
bboxes['body'][1] = max(bboxes['body'][1], r)
bboxes['body'][2] = min(bboxes['body'][2], t)
bboxes['body'][3] = max(bboxes['body'][3], b)
persons.append(bboxes)
return persons
pool_image_bboxes_name = 'cuhklarge-pool-bboxes.pickle'
pool_image_bboxes = {}
pool_image_bboxes_path = os.path.abspath(os.path.join(pool_image_bboxes_name))
if os.path.exists(pool_image_bboxes_path):
with open(pool_image_bboxes_path, 'rb') as f:
pool_image_bboxes = pickle.load(f)
print('Load pool image bboxes from pickle')
else:
start_time = time.time()
for i in range(test_pool_size):
image_name = dataset. get_test_image_name(i)
pool_image_bboxes[image_name] = find_person_bounding_box(image_name)
with open(pool_image_bboxes_path, 'wb') as f:
pickle.dump(pool_image_bboxes, f)
print('Total time to find bounding box: %.2f s.' % ((time.time() - start_time)))
def calculate_iou(boxA, boxB):
# determine the (x, y)-coordinates of the intersection rectangle
li = max(boxA[0], boxB[0])
ri = min(boxA[1], boxB[1])
ti = max(boxA[2], boxB[2])
bi = min(boxA[3], boxB[3])
# compute the area of intersection rectangle
interArea = (ri - li + 1) * (bi - ti + 1)
# compute the area of both the prediction and ground-truth
# rectangles
boxAArea = (boxA[1] - boxA[0] + 1) * (boxA[3] - boxA[2] + 1)
boxBArea = (boxB[1] - boxB[0] + 1) * (boxB[3] - boxB[2] + 1)
# compute the intersection over union by taking the intersection
# area and dividing it by the sum of prediction + ground-truth
# areas - the interesection area
iou = interArea / float(boxAArea + boxBArea - interArea)
# return the intersection over union value
return iou
def calculate_ioA(boxA, boxB):
# determine the (x, y)-coordinates of the intersection rectangle
li = max(boxA[0], boxB[0])
ri = min(boxA[1], boxB[1])
ti = max(boxA[2], boxB[2])
bi = min(boxA[3], boxB[3])
# compute the area of intersection rectangle
interArea = (ri - li + 1) * (bi - ti + 1)
# compute the area of both the prediction and ground-truth
# rectangles
boxAArea = (boxA[1] - boxA[0] + 1) * (boxA[3] - boxA[2] + 1)
# compute the intersection over union by taking the intersection
# area and dividing it by the sum of prediction + ground-truth
# areas - the interesection area
ioA = interArea / float(boxAArea)
# return the intersection over union value
return ioA
def calculate_ioB(boxA, boxB):
# determine the (x, y)-coordinates of the intersection rectangle
li = max(boxA[0], boxB[0])
ri = min(boxA[1], boxB[1])
ti = max(boxA[2], boxB[2])
bi = min(boxA[3], boxB[3])
# compute the area of intersection rectangle
interArea = (ri - li + 1) * (bi - ti + 1)
boxBArea = (boxB[1] - boxB[0] + 1) * (boxB[3] - boxB[2] + 1)
# compute the intersection over union by taking the intersection
# area and dividing it by the sum of prediction + ground-truth
# areas - the interesection area
ioB = interArea / float(boxBArea)
# return the intersection over union value
return ioB
def is_bounding_box_match_ground_truth(bbox, gtbox, threshold=0.5):
ioA = calculate_ioA(bbox, gtbox)
ioB = calculate_ioB(bbox, gtbox)
return ioA >= threshold or ioB >= threshold
def draw_ground_truth_box(oriImg, gtbox):
if len(gtbox) == 0:
return
l, r, t, b = gtbox
cv.rectangle(oriImg, (t,l), (b,r), (0,0,255), 2)
def draw_found_bounding_box(oriImg, bbox):
l, r, t, b = bbox['face']
cv.rectangle(oriImg, (t,l), (b,r), (0,255,0), 2)
l, r, t, b = bbox['up']
cv.rectangle(oriImg, (t,l), (b,r), (0,255,0), 2)
l, r, t, b = bbox['low']
cv.rectangle(oriImg, (t,l), (b,r), (0,255,0), 2)
l, r, t, b = bbox['body']
cv.rectangle(oriImg, (t,l), (b,r), (255,0,0), 2)
class GalleryObject:
image_name = None
gtbox = None
def __init__(self, image_name, gtbox):
self.image_name = image_name
self.gtbox = gtbox
def get_found_bboxes(self):
return pool_image_bboxes[self.image_name]
def get_found_bbox(self, index):
return self.get_found_bboxes()[index]
def get_found_vectors(self):
return pool_image_vectors[self.image_name]
def get_found_vector(self, index):
return self.get_found_vectors()[index]
def draw(self):
f, axarr = plt.subplots(1, 1)
f.set_size_inches((20, 20))
oriImg = cv.imread(dataset.get_image_path(self.image_name))
draw_ground_truth_box(oriImg, self.gtbox)
found_bboxes = self.get_found_bboxes()
for bbox in found_bboxes:
draw_found_bounding_box(oriImg, bbox)
axarr.imshow(oriImg[:,:,[2,1,0]])
axarr.set_title(self.image_name)
class QueryObject:
image_name = None
gtbox = None
matched_query_id = None
def __init__(self, image_name, gtbox):
self.image_name = image_name
self.gtbox = gtbox
found_bboxes = self.get_found_bboxes()
for i in range(len(found_bboxes)):
bbox = found_bboxes[i]
if is_bounding_box_match_ground_truth(bbox['body'], self.gtbox):
self.matched_query_id = i
break
def get_found_bboxes(self):
return pool_image_bboxes[self.image_name]
def get_query_bbox(self):
if self.matched_query_id is None:
return None
else:
return pool_image_bboxes[self.image_name][self.matched_query_id]
def get_found_vectors(self):
return pool_image_vectors[self.image_name]
def get_query_vector(self):
if self.matched_query_id is None:
return None
else:
return pool_image_vectors[self.image_name][self.matched_query_id]
def draw(self):
f, axarr = plt.subplots(1, 2)
f.set_size_inches((20, 20))
oriImg = cv.imread(dataset.get_image_path(self.image_name))
found_bboxes = self.get_found_bboxes()
for bbox in found_bboxes:
draw_found_bounding_box(oriImg, bbox)
axarr[0].imshow(oriImg[:,:,[2,1,0]])
axarr[0].set_title(self.image_name)
oriImg = cv.imread(dataset.get_image_path(self.image_name))
draw_ground_truth_box(oriImg, self.gtbox)
bbox = self.get_query_bbox()
if bbox is not None:
draw_found_bounding_box(oriImg, bbox)
axarr[1].imshow(oriImg[:,:,[2,1,0]])
axarr[1].set_title(self.image_name)
# ['face', 'up', 'low', 'body']
similarity_weight = dict(zip(body_types, [0.0, 1.0, 1.0]))
def find_similarity(query_vector, gallery_vector):
similarity = dict(zip(body_types, [0] * len(body_types)))
for part in body_types:
if part == 'body':
continue
if len(query_vector[part]) == 0 or len(gallery_vector[part]) == 0:
similarity[part] = None
else:
similarity[part] = 0.0
methods = len(query_vector[part])
for m in range(methods):
similarity[part] += 1.0 - distance.cosine(query_vector[part][m], gallery_vector[part][m])
similarity[part] /= methods
return similarity
def find_final_similarity(similarity):
result = 0
count = 0
for part in similarity_weight:
if similarity[part] is not None:
result += similarity[part] * similarity_weight[part]
count += similarity_weight[part]
if count > 0:
result /= count
return result
class TestSample:
query_obj = None
gallery_objs = None
def __init__(self, index):
query_data = dataset.get_test_query_query_data(index)
self.query_obj = QueryObject(query_data.imname, query_data.idlocate)
self.gallery_objs = []
for i in range(gallery_size):
gallery_data = dataset.get_test_query_gallery_data(index, i)
self.gallery_objs.append(GalleryObject(gallery_data.imname, gallery_data.idlocate))
def _evaluate(self, topk):
y_true = []
y_score = []
imgs = []
rois = []
count_gt = 0
count_tp = 0
for gobj in self.gallery_objs:
image_name = gobj.image_name
gt = np.array(gobj.gtbox, dtype=np.int32)
count_gt += (gt.size > 0)
found_number = len(gobj.get_found_vectors())
bbox = np.zeros((found_number, 4), dtype=np.int32)
sim = np.zeros(found_number, dtype=np.float)
label = np.zeros(found_number, dtype=np.int32)
for i in range(found_number):
s = find_similarity(self.query_obj.get_query_vector(), gobj.get_found_vector(i))
sim[i] = find_final_similarity(s)
bbox[i,:] = np.array(gobj.get_found_bbox(i)['body'])
if gt.size > 0:
inds = np.argsort(sim)[::-1]
sim = sim[inds]
bbox = bbox[inds]
for j in range(found_number):
if is_bounding_box_match_ground_truth(bbox[j], gobj.gtbox):
label[j] = 1
count_tp += 1
y_true.extend(list(label))
y_score.extend(list(sim))
imgs.extend([image_name] * found_number)
rois.extend(list(bbox))
y_true = np.asarray(y_true)
y_score = np.asarray(y_score)
recall_rate = float(count_tp) / count_gt
ap = 0 if count_tp == 0 else average_precision_score(y_true, y_score) * recall_rate
inds = np.argsort(y_score)[::-1]
y_score = y_score[inds]
y_true = y_true[inds]
acc = [min(1, sum(y_true[:k])) for k in topk]
return (ap, acc)
def evaluate(self, topk):
if self.query_obj.matched_query_id is None:
ap = np.nan
acc = [np.nan] * len(topk)
return (ap, acc)
else:
return self._evaluate(topk)
for i in range(query_size):
sample = TestSample(i)
qobj = sample.query_obj
if qobj.matched_query_id is None:
qobj.draw()